In [1]:
import io, time, json
import requests
from bs4 import BeautifulSoup
import pandas as pd
import urllib, urllib2
import re
import os
import numpy as np
import ftplib
from ftplib import FTP
import timeit
This downloads hourly data from the ftp server over a range of years, and saves all of the file names/last update times in a list. The downloads can take some time depending on how much data is being retrieved.
Some of the code below assumes that we only need to retrieve new or modified files. If you are retrieving this data for the first time, create an empty dataframe named already_downloaded
with column names file name
and last updated
.
In [3]:
# Replace the filename with whatever csv stores already downloaded file info
path = os.path.join('EPA downloads', 'name_time 2015-2016.csv')
already_downloaded = pd.read_csv(path, parse_dates=['last updated'])
# Uncomment the line below to create an empty dataframe
# already_downloaded = pd.DataFrame(columns=['file name', 'last updated'])
In [3]:
already_downloaded.head()
Out[3]:
In [18]:
# Timestamp
start_time = timeit.default_timer()
name_time_list = []
# Open ftp connection and navigate to the correct folder
print 'Opening ftp connection'
ftp = FTP('ftp.epa.gov')
ftp.login()
ftp.cwd('/dmdnload/emissions/hourly/monthly')
for year in [2015, 2016, 2017]:
print year
year_str = str(year)
print 'Change directory to', year_str
try:
ftp.cwd(year_str)
except ftplib.all_errors as e:
print e
break
# Use ftplib to get the list of filenames
print 'Fetch filenames'
fnames = ftp.nlst()
# Create new directory path if it doesn't exist
new_path = os.path.join('EPA downloads', year_str)
try:
os.mkdir(new_path)
except:
pass
# Look for files without _HLD in the name
name_list = []
time_list = []
print 'Find filenames without _HLD and time last updated'
for name in fnames:
if '_HLD' not in name:
try:
# The ftp command "MDTM" asks what time a file was last modified
# It returns a code and the date/time
# If the file name isn't already downloaded, or the time isn't the same
tm = pd.to_datetime(ftp.sendcmd('MDTM '+ name).split()[-1])
if name not in already_downloaded['file name'].values:
time_list.append(tm)
name_list.append(name)
elif already_downloaded.loc[already_downloaded['file name']==name, 'last updated'].values[0] != tm:
tm = ftp.sendcmd('MDTM '+ name)
time_list.append(pd.to_datetime(tm.split()[-1]))
name_list.append(name)
except ftplib.all_errors as e:
print e
# If ftp.sendcmd didn't work, assume the connection was lost
ftp = FTP('ftp.epa.gov')
ftp.login()
ftp.cwd('/dmdnload/emissions/hourly/monthly')
ftp.cwd(year_str)
tm = ftp.sendcmd('MDTM '+ name)
time_list.append(pd.to_datetime(tm.split()[-1]))
name_list.append(name)
# Store all filenames and update times
print 'Store names and update times'
name_time_list.extend(zip(name_list, time_list))
# Download and store data
print 'Downloading data'
for name in name_list:
try:
with open(os.path.join('EPA downloads', year_str, name), 'wb') as f:
ftp.retrbinary('RETR %s' % name, f.write)
except ftplib.all_errors as e:
print e
try:
ftp.quit()
except ftplib.all_errors as e:
print e
pass
ftp = FTP('ftp.epa.gov')
ftp.login()
ftp.cwd('/dmdnload/emissions/hourly/monthly')
ftp.cwd(year_str)
with open(os.path.join('EPA downloads', year_str, name), 'wb') as f:
ftp.retrbinary('RETR %s' % name, f.write)
print 'Download finished'
print round((timeit.default_timer() - start_time)/60.0,2), 'min so far'
# Go back up a level on the ftp server
ftp.cwd('..')
# Timestamp
elapsed = round((timeit.default_timer() - start_time)/60.0,2)
In [19]:
print 'Data download completed in %s mins' %(elapsed)
In [120]:
name_time_df = pd.DataFrame(name_time_list, columns=['file name', 'last updated'])
In [121]:
name_time_df.head()
Out[121]:
In [122]:
len(name_time_df)
Out[122]:
In [124]:
path = os.path.join('EPA downloads', 'name_time 2015-2016.csv')
name_time_df.to_csv(path, index=False)
In [16]:
import csv
import zipfile
import StringIO
from collections import Counter
This takes ~100 ms per file.
In [23]:
base_path = 'EPA downloads'
num_cols = {}
col_names = {}
for year in range(2001, 2017):
n_cols_list = []
col_name_list = []
path = os.path.join(base_path, str(year))
fnames = os.listdir(path)
for name in fnames:
csv_name = name.split('.')[0] + '.csv'
fullpath = os.path.join(path, name)
filehandle = open(fullpath, 'rb')
zfile = zipfile.ZipFile(filehandle)
data = StringIO.StringIO(zfile.read(csv_name)) #don't forget this line!
reader = csv.reader(data)
columns = reader.next()
# Add the column names to the large list
col_name_list.extend(columns)
# Add the number of columns to the list
n_cols_list.append(len(columns))
col_names[year] = Counter(col_name_list)
num_cols[year] = Counter(n_cols_list)
From the table below, recent years always have the units after an emission name. Before 2009 some files have the units and some don't. UNITID is consistent through all years, but UNIT_ID was added in after 2008 (not the same thing).
In [24]:
pd.DataFrame(col_names)
Out[24]:
In [26]:
pd.DataFrame(col_names).index
Out[26]:
In [25]:
pd.DataFrame(num_cols)
Out[25]:
Using joblib for this.
Joblib on windows requires the if name == 'main': statement. And in a Jupyter notebook the function needs to be imported from an external script. I probably should have done the parallel part at a higher level - the longest part is saving the csv files. Could use this method - disable a check - to speed up the process.
Joblib has to be at least version 10.0, which is only available through pip - got some errors when using the version installed by conda.
Create a dictionary mapping column names. Any values on the left (keys) should be replaced by values on the right (values).
In [21]:
col_name_map = {'CO2_MASS' : 'CO2_MASS (tons)',
'CO2_RATE' : 'CO2_RATE (tons/mmBtu)',
'GLOAD' : 'GLOAD (MW)',
'HEAT_INPUT' : 'HEAT_INPUT (mmBtu)',
'NOX_MASS' : 'NOX_MASS (lbs)',
'NOX_RATE' : 'NOX_RATE (lbs/mmBtu)',
'SLOAD' : 'SLOAD (1000lb/hr)',
'SLOAD (1000 lbs)' : 'SLOAD (1000lb/hr)',
'SO2_MASS' : 'SO2_MASS (lbs)',
'SO2_RATE' : 'SO2_RATE (lbs/mmBtu)'
}
In [22]:
from joblib import Parallel, delayed
from scripts import import_clean_epa
In [23]:
if __name__ == '__main__':
start_time = timeit.default_timer()
base_path = 'EPA downloads'
for year in range(2015, 2017):
print 'Starting', str(year)
df_list = []
path = os.path.join(base_path, str(year))
fnames = os.listdir(path)
df_list = Parallel(n_jobs=-1)(delayed(import_clean_epa)(path, name, col_name_map) for name in fnames)
print 'Combining data'
df = pd.concat(df_list)
print 'Saving file'
path_out = os.path.join('Clean data', 'EPA emissions', 'EPA emissions ' + str(year) + '.csv')
df.to_csv(path_out, index=False)
print round((timeit.default_timer() - start_time)/60.0,2), 'min so far'
# Timestamp
elapsed = round((timeit.default_timer() - start_time)/60.0,2)